-
Notifications
You must be signed in to change notification settings - Fork 13
Splunk extractor #426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Splunk extractor #426
Conversation
fca2a02
to
3c472f9
Compare
14e5162
to
7c3a213
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #426 +/- ##
==========================================
+ Coverage 98.21% 98.25% +0.03%
==========================================
Files 152 154 +2
Lines 6111 6249 +138
==========================================
+ Hits 6002 6140 +138
Misses 109 109
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2fc411b
to
e3580e4
Compare
d61964e
to
2aa4332
Compare
return headers | ||
|
||
@property | ||
def _normalized_query(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for base queries the search is implied:
index=index_a
But for subqueries:
index=index_a | join [search index=index_b]
the search field is required.
try: | ||
root = ET.fromstring(response.text) | ||
for elem in root.iter(): | ||
if ( | ||
"dispatchState" in elem.tag | ||
or elem.get("name") == "dispatchState" | ||
): | ||
dispatch_state = elem.text | ||
break | ||
except Exception as e: | ||
self.logger.warning( | ||
"Failed to parse job status", | ||
extra={"error": str(e), "search_id": search_id}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see there is a pattern here between parsing things as Json and parsing them as XML. I would recommend creating a parser class that will take an arbitrary object of one of the two types and get the corresponding object a bit more declaratively.
class SplunkResponseParser:
def parse(object: Json | XML )
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to riff on this - create a parser interface that you can call parse
on and it returns to you a consistent result. Switch on the return type and create an instance of either a JsonSplunkResponseParser
or XmlSplunkResponseParser
. This can be done as a factory method on SplunkResponseParser
.
This is a refactoring stategy called "replace conditional with polymorphism"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the interface idea.
job_status = response.json() | ||
dispatch_state = ( | ||
job_status.get("entry", [{}])[0] | ||
.get("content", {}) | ||
.get("dispatchState") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are heavy assumptions here about the state of the JSON, do we know why it is the first field in the entry list? I fear that a parallel job run from a different splunk extractor will make a job, and it will end up being the top of the list here. Or is it the case that a search can have multiple Jobs? Why is it the first one that we choose that we care about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
for entry in job_status.get("entry", []):
Do checking on all entries that some condition is true. Or that one of the entries is the one we are looking for.
await asyncio.sleep(2) # Wait 2 seconds before checking again | ||
wait_count += 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is more standard to do something akin to:
attempts = 0
while should_continue:
should_continue = success_condition AND attempts < MAX_ATTEMPTS
attempts += 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in #425 we're discussing adding a library like tenacity to handle things like this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to tenacity
elif dispatch_state == "FAILED": | ||
raise RuntimeError(f"Search job failed: {search_id}") | ||
|
||
await asyncio.sleep(2) # Wait 2 seconds before checking again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also replace this with a global variable SPLUNK_STATUS_CHECK_PERIOD_SECONDS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would very much prefer this. My mantra is "prefer no magic numbers/constant strings". Ideally, most constants should be extracted into config or something like it, but at the very least make them module level constants until it's clear which ones need to change often.
async def _wait_for_job_completion( | ||
self, client: AsyncClient, search_id: str, max_wait_seconds: int = 300 | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try typing the return types. Instead of nesting try: catches, you can return a completion state of True and False in order to hand off the expected state to the over-arching function.
"search": self._normalized_query, | ||
"earliest_time": self.earliest_time, | ||
"latest_time": self.latest_time, | ||
"max_count": str(self.max_count), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why turn to string here, maybe have the interface expect a string
class SplunkExtractor(Extractor): | ||
@classmethod | ||
def from_file_data( | ||
cls, | ||
base_url: str, | ||
query: str, | ||
auth_token: Optional[str] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
earliest_time: str = "-24h", | ||
latest_time: str = "now", | ||
verify_ssl: bool = True, | ||
request_timeout_seconds: int = 300, | ||
max_count: int = 10000, | ||
app: str = "search", | ||
user: Optional[str] = None, | ||
chunk_size: int = 1000, | ||
) -> "SplunkExtractor": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to seperate the functionality into a Client and have the extract_records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
"count": str(self.chunk_size), | ||
"offset": str(self.offset), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these strings, do we need to change the input parameters to just intake strigified integers?
if response.status_code != 200: | ||
raise HTTPStatusError( | ||
f"Failed to get job results: {response.status_code}", | ||
request=response.request, | ||
response=response, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the error is a 504, it could be the case that we hit the endpoint often and they would try to rate-limit us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO:
- 401/403/404 - stop trying
- 2xx are mostly ok. Limiting to 200 may be a problem, but it rarely is.
- 3xx should almost never be seen, because they should be handled by the underlying http library and do whatever forwarding is necessary
- 4xx,5xx - log and try again with backoff
- Connection/DNS error - log and try again with backoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I feel like this indicates there might be a way to make a more pluggable "client" idea once we figure out a unified retry logic. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have an attempt to do this in our internal library but have not had the time to introduce it nodestream. Does everything from the status check handling, retrying, error handling, and json safe loading.
Also note that there is a Splunk client supported by Splunk itself. Not sure if there is much distinguising the two, but it might make a lot of the abstraction easier like the Job handling: |
|
||
async def resume_from_checkpoint(self, checkpoint_object): | ||
"""Resume extraction from a checkpoint.""" | ||
if checkpoint_object: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is not required. Nodestream does this. You should only be getting called this with a non-none object.
class SplunkExtractor(Extractor): | ||
@classmethod | ||
def from_file_data( | ||
cls, | ||
base_url: str, | ||
query: str, | ||
auth_token: Optional[str] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
earliest_time: str = "-24h", | ||
latest_time: str = "now", | ||
verify_ssl: bool = True, | ||
request_timeout_seconds: int = 300, | ||
max_count: int = 10000, | ||
app: str = "search", | ||
user: Optional[str] = None, | ||
chunk_size: int = 1000, | ||
) -> "SplunkExtractor": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
def get_jobs_endpoint(self) -> str: | ||
"""Get the Splunk jobs endpoint.""" | ||
return f"{self.base_url}/servicesNS/{self.user}/{self.app}/search/jobs" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be computed in the constructor - I don't see a clear reason to generate this string constantly.
"""Get the results endpoint for a specific search job.""" | ||
return f"{self.base_url}/servicesNS/{self.user}/{self.app}/search/jobs/{search_id}/results" | ||
|
||
async def _create_search_job(self, client: AsyncClient) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has a lot going on:
- formulates a request body
- makes the request
- parses the response in one of two different possible return types.
For a simpler request, that may be fine, but given the leg work required, its easy to get lost in this function. I'd recommend breaking this down. Make this function tell a "story" while other functions describe the details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, shouldn't this be search/v2/jobs/{search_id}/results
or am I looking at the wrong docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually actually, do we want to support a streaming splunk search result from search/v2/jobs/export
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, more splunk admin/hacker knowledge is being unlocked:
when we say "splunk extractor" we should consider supporting (and being clear about which we're supporting):
- ad-hoc "streaming" queries (time-bound, no job-id required)
- running a "job" and then getting the results (requires creating the job and then getting the results, what this PR covers)
- accessing "scheduled" search results (by schedule name)
try: | ||
root = ET.fromstring(response.text) | ||
for elem in root.iter(): | ||
if ( | ||
"dispatchState" in elem.tag | ||
or elem.get("name") == "dispatchState" | ||
): | ||
dispatch_state = elem.text | ||
break | ||
except Exception as e: | ||
self.logger.warning( | ||
"Failed to parse job status", | ||
extra={"error": str(e), "search_id": search_id}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to riff on this - create a parser interface that you can call parse
on and it returns to you a consistent result. Switch on the return type and create an instance of either a JsonSplunkResponseParser
or XmlSplunkResponseParser
. This can be done as a factory method on SplunkResponseParser
.
This is a refactoring stategy called "replace conditional with polymorphism"
await asyncio.sleep(2) # Wait 2 seconds before checking again | ||
wait_count += 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in #425 we're discussing adding a library like tenacity to handle things like this as well.
) | ||
return search_id | ||
|
||
async def _wait_for_job_completion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My IDE flags this as having too much "cognitive complexity" (Aka too many nested branches and loops)
It would be good to refactor chunks of this into smaller functions to reduce the amount of brainpower required to figure out what's going on.
) | ||
except (json.JSONDecodeError, KeyError, IndexError): | ||
# Try XML parsing | ||
import xml.etree.ElementTree as ET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more "pythonic" to say:
import xml.etree.ElementTree as ET | |
from xml.etree import ElementTree |
typically the community reserves aliases for modules (and almost always lowercase only):
import pandas as pd
import numpy as np
mock_response = mocker.MagicMock() | ||
mock_response.status_code = 201 | ||
mock_response.json.return_value = {"sid": "json123"} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prefer using responses
or pytest-httpx
to handle this mocking, they're more careful about rejecting unexpected calls and emulating an actual http call.
|
||
# Helper property tests | ||
def test_splunk_extractor_auth_property_with_token(splunk_extractor): | ||
assert_that(splunk_extractor._auth, equal_to(None)) # Token goes in header |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general I would avoid testing internal/private properties and functions to avoid making tests that rely on implementation details.
These tests can be handled by responses
and pytest-httpx
by having these mock libraries watching for expected auth headers.
assert_that(results, has_length(3)) | ||
assert_that( | ||
results[0], | ||
has_entries( | ||
{ | ||
"_time": "2023-01-01T10:00:00", | ||
"host": "server1", | ||
"message": "Login successful", | ||
} | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not assert the whole result?
assert_that(results, has_length(3)) | |
assert_that( | |
results[0], | |
has_entries( | |
{ | |
"_time": "2023-01-01T10:00:00", | |
"host": "server1", | |
"message": "Login successful", | |
} | |
), | |
) | |
results == [ | |
{ | |
"_time": "2023-01-01T10:00:00", | |
"host": "server1", | |
"message": "Login successful", | |
}, | |
{...}, | |
{...} | |
] |
|
||
# Should handle gracefully and return empty results | ||
assert_that(results, has_length(0)) | ||
assert_that(splunk_extractor.is_done, equal_to(True)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hamcrest is great IMO for lists, but it just feels ugly for equalities. This is purely me being a hater of hamcrest.
assert_that(splunk_extractor.is_done, equal_to(True)) | |
assert splunk_extractor.is_done |
No description provided.